﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using OF.DistributeService.Core.Common;
using OF.Notify.Client;
using OF.Notify.Entity;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace OF.Notify.DataHost.Cluster
{
    public class ConsumerTopicDataNode
    {
        public IdTreeNode processedMergeTree = null;
        public MergeContext processedMergeContext = null;
        public MergeSession processedMergeSession = null;

        internal byte topicEnum;
        internal TopicDataNode topicDataNode = null;
        internal DataNode dataNode = null;
        internal byte consumerEnum;
        internal ConsumerBase consumer = null;
        internal ConsumerTopicClusterContext consumerContext = null;
        internal ConcurrentDictionary<int, VisitHeadTailLink> onlineMessageListDict = new ConcurrentDictionary<int, VisitHeadTailLink>();
        internal ConcurrentDictionary<int, bool> onProcessingCollectionDict = new ConcurrentDictionary<int, bool>();

        public ConsumerTopicDataNode(byte topicEnum, byte consumerEnum, TopicClusterContext topicContext, DataNode dataNode, TopicDataNode topicDataNode)
        {
            this.topicEnum = topicEnum;
            this.consumerEnum = consumerEnum;
            this.dataNode = dataNode;
            this.topicDataNode = topicDataNode;
            this.consumerContext = topicContext.GetConsumerContext(consumerEnum);
            this.consumer = topicContext.GetConsumer(consumerEnum);
            int[] calArray = IdTreeNode.GetCalArray(consumerContext.GetProcessedCountArray());
            this.processedMergeSession = new MergeSession(topicContext.GetConsumerContext(consumerEnum).GetProcessedTreeStoragePath(dataNode.Id), calArray);
            this.processedMergeContext = new MergeContext(processedMergeSession);
            this.processedMergeTree = IdTreeNode.Restore(processedMergeSession, ClusterContext.Get().GetProcessedMaxValue());
            Thread thread = new Thread(new ThreadStart(HandleOnlineMessage));
            thread.Start();
        }

        public List<int> GetProcessedCollectionList(SendGetUnProcessedFinishedCollectionsRequest request)
        {
            List<int> processedCollectionList = new List<int>();
            foreach (var collectionId in request.unProcessedCollections)
            {
                if (IsCollectionProcessed(collectionId))
                {
                    processedCollectionList.Add(collectionId);
                }
            }
            return processedCollectionList;
        }

        public List<int>  GetProcessingCollectionList(SendGetUnProcessedFinishedCollectionsRequest request)
        {
            List<int> processingCollectionList = new List<int>();
            foreach (var collectionId in request.unProcessedCollections)
            {
                if (this.onProcessingCollectionDict.ContainsKey(collectionId))
                {
                    processingCollectionList.Add(collectionId);
                }
            }
            return processingCollectionList;
        }

        private void TryRemoveProcessingCollection(int collectionId)
        {
            bool tempV = false;
            onProcessingCollectionDict.TryRemove(collectionId, out tempV);
        }

        internal bool IsCollectionProcessed(int collectionId)
        {
            IdTreeNode.GetQueryNodesParam queryParam = new IdTreeNode.GetQueryNodesParam
            {
                count = 1,
                dirMaxLevel = processedMergeSession.calArray.Length - 1,
                maxValue = collectionId,
                minValue = collectionId,
                result = new List<int>(1)
            };
            processedMergeTree.GetExistsNodes(queryParam);
            bool isProcessed = queryParam.result.Count > 0;
            if (isProcessed)
            {
                TryRemoveProcessingCollection(collectionId);
            }
            return isProcessed;
        }

        internal void HandleOnlineMessage()
        {
            try
            {
                while (true)
                {
                    if (topicDataNode.dataNode.isDisposed)
                    {
                        return;
                    }

                    try
                    {
                        var list = onlineMessageListDict.ToList();
                        List<int> toRemoveList = new List<int>();
                        foreach (var kv in list)
                        {
                            if (topicDataNode.dataNode.isDisposed)
                            {
                                return;
                            }

                            VisitHeadTailLink link = kv.Value;
                            VisitLinkResult visitResult = link.VisitAll(this);
                            if (visitResult == VisitLinkResult.UnFinish)
                            {
                                continue;
                            }

                            bool isSuccessed = false;
                            if (visitResult == VisitLinkResult.Finished)
                            {
                                isSuccessed = true;
                                OnNodeProcessed(link.CollectionId);
                            }
                            else
                            {
                                continue;
                            }
                            var context = ClusterContext.Get();
                            bool isSuccess = Util.SafeLoopUtilTrue((loopI1) => {
                                var callResult = topicDataNode.dataNode.masterDataNodeProxy.SendInstantCollectionFinished(new SendInstantCollectionFinishedRequest
                                {
                                    clusterId = link.ClusterId,
                                    collectionId = link.CollectionId,
                                    topicEnum = topicEnum,
                                    consumerEnum = consumerEnum,
                                    isSuccessed = isSuccessed
                                });
                                return callResult.IsSuccess();
                            }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());
                            if (isSuccess)
                            {
                                toRemoveList.Add(kv.Key);
                            }
                        }
                        foreach (var collectionId in toRemoveList)
                        {
                            VisitHeadTailLink link = null;
                            onlineMessageListDict.TryRemove(collectionId, out link);
                        }
                    }
                    catch (Exception ex2)
                    {
                        Util.LogException("HandleOnlineMessage2", ex2);
                        if (topicDataNode.dataNode.isDisposed)
                        {
                            return;
                        }
                    }
                    Thread.Sleep(20);
                }
            }
            catch (Exception ex)
            {
                Util.LogException("HandleOnlineMessage", ex);
            }
        }
        
        public bool AddOnlineMessgeListToHandle(int clusterId, int collectionId, HeadTailLink<TopicMessage> topicMessageList)
        {
            if (this.onProcessingCollectionDict.ContainsKey(collectionId))
            {
                return false;
            }
            this.onProcessingCollectionDict.TryAdd(collectionId, true);
            VisitHeadTailLink visitLink = new VisitHeadTailLink { ClusterId = clusterId, CollectionId = collectionId, Link = topicMessageList };
            onlineMessageListDict.AddOrUpdate(collectionId, visitLink, (k, v) => visitLink);
            return true;
        }
        
        private bool ConsumeCollection(int collectionId)
        {
            if (topicDataNode.dataNode.isDisposed)
            {
                return false;
            }
            List<TopicMessage> messageList = topicDataNode.GetClusterMessageList(collectionId);
            if (messageList != null)
            {
                this.onProcessingCollectionDict.TryAdd(collectionId, true);
                foreach (var message in messageList)
                {
                    if (topicDataNode.dataNode.isDisposed)
                    {
                        return false;
                    }

                    try
                    {
                        this.consumer.OnMessage(false, message);
                    }
                    catch (Exception ex)
                    {
                        Util.LogException("ConsumeCollections, DataCollection:" + collectionId, ex);
                        if (topicDataNode.dataNode.isDisposed)
                        {
                            return false;
                        }
                    }
                }
                OnNodeProcessed(collectionId);
                return true;
            }
            return false;
        }

        public List<int> ConsumeCollections(List<int> collectionList)
        {
            List<int> consumeSuccessedList = new List<int>();
            foreach (var collectionId in collectionList)
            {
                if (topicDataNode.dataNode.isDisposed)
                {
                    return consumeSuccessedList;
                }
                if (this.onProcessingCollectionDict.ContainsKey(collectionId))
                {
                    continue;
                }                
                bool isSuccess = ConsumeCollection(collectionId);
               
                if (isSuccess)
                {
                    consumeSuccessedList.Add(collectionId);
                }
                else
                {
                    if (topicDataNode.dataNode.isDisposed)
                    {
                        return consumeSuccessedList;
                    }
                }
            }
            return consumeSuccessedList;
        }

        private void OnNodeProcessed(int processedCollectionId)
        {
            var processedTreeNode = IdTreeNode.NewIdTree(processedMergeSession.calArray, processedCollectionId, ClusterContext.Get().GetProcessedMaxValue());
            this.processedMergeTree.MergeNode(processedTreeNode, processedMergeContext);
            this.processedMergeTree.SaveChangedNodes(processedMergeSession);
            TryRemoveProcessingCollection(processedCollectionId);
            topicDataNode.DeleteCollectionIfAllConsumed(processedCollectionId);
        }

        public void SendSyncProcessedTree(SendSyncProcessedTreeRequest request)
        {
            this.processedMergeTree.MergeNode(request.processedTreeNode, processedMergeContext);
            this.processedMergeTree.SaveChangedNodes(processedMergeSession);
            if (request.toDeleteCollections != null && request.toDeleteCollections.Count > 0)
            {
                foreach (int toDeleteCollectionId in request.toDeleteCollections)
                {
                    TryRemoveProcessingCollection(toDeleteCollectionId);
                    topicDataNode.DeleteCollectionIfAllConsumed(toDeleteCollectionId);
                }
            }
        }
    }
}

